package me.seu.demo.kafka;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import me.seu.demo.common.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
 * kafka生产者
 *
 * @author liangfeihu
 * @since 2020/3/9 12:01
 */
@Slf4j
@Component
@SuppressWarnings("ALL")
public class KafkaProducer {

    private final static int NUM = 3;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    /**
     * 发送消息方法
     */
    public void send() {
        for (int i = 0; i < NUM; i++) {
            Message message = new Message();
            message.setId(i);
            message.setMsg(UUID.randomUUID().toString().replace("-", "") + "---" + i);
            message.setSendTime(new Date());

            /*log.info("发送消息 ----->>>>>  message = {}", gson.toJson(message));*/

            log.info("同步发送消息 ----->>>>>  message = {}", JSON.toJSONString(message));
            ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("test_kafka", JSON.toJSONString(message));
            try {
                SendResult<String, String> sendResult = result.get();
                log.info("[send msg return result] {}", sendResult);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        } // end for
    } // end send method

}

